feat: begin laying out things for go worker#5275
Hidden character warning
feat: begin laying out things for go worker#5275michaelkedar wants to merge 8 commits intogoogle:masterfrom
Conversation
| } | ||
|
|
||
| func (s *Subscriber) Run(ctx context.Context) error { | ||
| return s.PubSubSub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { |
There was a problem hiding this comment.
by they way, you can configure parallelism on this pubsub.Subscriber, so we wouldn't have to manually manage a worker pool
There was a problem hiding this comment.
Do you have an example of how this is done? It just calls the Receive function in multiple goroutines?
There was a problem hiding this comment.
There's Subscriber.ReceiveSettings.MaxOutstandingMessages to configure the number of messages (and therefore goroutines) to be processed at once
|
|
||
| return | ||
| } | ||
| buf := make([]byte, 0, len(m.Data)*3) // let's guess 3x compression |
There was a problem hiding this comment.
zstd probably has the full size in the encoded data's metadata, it's apparently an optional header, but we can just make sure we encode that on the encoding end.
There was a problem hiding this comment.
Actually had a quick look at how to do this, seems kind of annoying to do, so probably just add a TODO here and leave it.
| slog.String("source", task.SourceID), | ||
| slog.String("path", task.PathInSource), | ||
| } | ||
| if len(m.Data) != 0 { |
There was a problem hiding this comment.
Can we add some new lines and comments to break up this function a bit. (Or probably better to put it into separate functions)
There was a problem hiding this comment.
Should we just call this file engine?
There was a problem hiding this comment.
renamed this to engine; moved some other things to what once was interfaces and renamed that to worker 🙂
| } else if err != nil { | ||
| logger.ErrorContext(ctx, "Failed to get current vuln state", slog.String("vuln_id", enriched.GetId()), slog.Any("error", err)) | ||
| return fmt.Errorf("failed to get current vuln state: %w", err) | ||
| } else if e.isSemanticallyDifferent(current, enriched) { |
There was a problem hiding this comment.
nit: remove else since it's already returngin
There was a problem hiding this comment.
This still needs the else since the first if doesn't return
maybe this would be neater as a switch statement?
There was a problem hiding this comment.
didn't make it a switch, but changed the logic to be a bit easier to follow
| } | ||
| if task.Vuln == nil { | ||
| // TODO: Download Vuln from source | ||
| return errors.New("vuln not provided") |
There was a problem hiding this comment.
I think returning the error is the correct move. We shouldn't try to download the vuln in worker, that should be importer's job.
There was a problem hiding this comment.
I guess I'm worried about what if the compressed vuln is too big to send over pub/sub (which, I guess would be really big), and also if we ever want to request an update to a record from a datafix tool or something we'd need to download it there as well.
There was a problem hiding this comment.
Hmm still think the vuln should be sent along by the datafix tool right?
For the really big records, I think that's something we should solve with a side channel to pass the vuln along. I just think it's going to get really messy / buggy if we start adding logic for getting the record from upstream into the worker. Probably something we don't need to worry about for now as I think with zstd compression it has to be something truly monstrous for it to be a problem, and it'll likely cause other breakages as well.
|
|
||
| var _ worker.Enricher = (*SourceLinkAdder)(nil) | ||
|
|
||
| func (*SourceLinkAdder) Enrich(_ context.Context, vuln *osvschema.Vulnerability, params *worker.EnrichParams) error { |
There was a problem hiding this comment.
From the package comment, I think this is likely intended as an example, rather than the full complement of enrichers? Perhaps say so explicitly in a comment.
There was a problem hiding this comment.
moved some things around and moved each enricher to its own package so that the package comments are more specific to the package
| Vulnerability models.VulnerabilityStore | ||
| } | ||
|
|
||
| type EnrichParams struct { |
There was a problem hiding this comment.
Are these the only things that would be needed for any possible enricher? If not, would you expect that we would include all possible types as (optional, I guess) fields here?
There was a problem hiding this comment.
This I don't have a great design for.
This struct should contain all things needed across all enrichers, so if a new enricher needs something else we'd have to add it here and populate it.
I guess technically all the fields are optional, (if a specific enricher doesn't need it) but practically, we'd be running every enricher so all the fields become required.
| Source string // The source name (e.g. "debian") | ||
| Path string // The relative path in the source (e.g. "CVE-2023.json") | ||
| Raw *osvschema.Vulnerability // The original input proto | ||
| Processed *osvschema.Vulnerability // The final enriched proto |
There was a problem hiding this comment.
Elsewhere you seem to have called this "enriched". Is the difference meaningful?
There was a problem hiding this comment.
Processed felt a bit more generic, but I can change it to enriched for consistency
Very much not complete, but think of this as a design doc without the doc part.
The basic idea is to have a bunch of Enrichers in charge of populating fields in the records as the worker currently does, but in a more concrete and modular pipeline.
I've Implemented the thing that adds the source link into database_specific as an example, but Enrichers will include version enumeration, PURL generated, etc.
Affected commit computation and enumeration will happen after the enricher pipeline, since it needs to return the list of commits that are not part of the OSV record.
I've made two separate structs for the pub/sub subscriber/parser and the actual vuln processing struct (the 'engine') which hopefully makes it less coupled to pub/sub.